HEX
Server: LiteSpeed
System: Linux eticaretsrv4.isimtescil.net 3.10.0-962.3.2.lve1.5.26.7.el7.x86_64 #1 SMP Wed Oct 2 07:53:12 EDT 2019 x86_64
User: sioberen (1086)
PHP: 7.3.33
Disabled: NONE
Upload Files
File: //opt/alt/python37/lib/python3.7/site-packages/lvestats/lib/commons/litespeed.py
# coding=utf-8
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
from __future__ import absolute_import
import base64
import urllib.request
import urllib.error
import urllib.parse
import ssl
import os

from lvestats.lib.commons.proctitle import Proctitle


class LiteSpeedException(Exception):
    pass


class LiteSpeedDisabledException(LiteSpeedException):
    pass


class LiteSpeedInvalidCredentials(LiteSpeedException):
    pass


class LiteSpeedDataMapping(object):
    TIME = 3
    HOST = 8
    REQUEST = 14

    TOTAL_LEN = 15


class LiteSpeed(object):
    IGNORE_HOSTS = [b'_AdminVHost']
    PID_FILE_PATH = '/tmp/lshttpd/lshttpd.pid'
    HTPASSWD_PATH = '/usr/local/lsws/admin/htpasswds/status'
    STATUS_URL = 'http://localhost:7080/status?rpt=details'
    HTTP_TIMEOUT = 2

    def __init__(self, login, password):
        self.login = login
        self.password = password

    @staticmethod
    def _get_litespeed_pid():
        """
        Returns pid that is stored in litespeed's pidfile
        :return: str
        """
        if os.path.isfile(LiteSpeed.PID_FILE_PATH) and os.path.isfile(LiteSpeed.HTPASSWD_PATH):
            with open(LiteSpeed.PID_FILE_PATH) as f:
                return f.readline().rstrip(os.linesep)
        else:
            return None

    @staticmethod
    def is_litespeed_running():
        """
        Checks whether pid is not None.
        :return: bool
        """
        return LiteSpeed._get_litespeed_pid() is not None

    def _get_requests(self):
        """
        Get info about connections from litespeed
        and returns array of rows with data
        :return: list
        :raise: [LiteSpeedInvalidCredentials, LiteSpeedDisabledException]
        """
        request = urllib.request.Request(self.STATUS_URL)

        base64string = base64.b64encode(b'%s:%s' % (self.login.encode(), self.password.encode()))
        request.add_header(b"Authorization", b"Basic %s" % base64string)

        # get data from litespeed, check whether http code is 200
        try:
            response = urllib.request.urlopen(request, timeout=self.HTTP_TIMEOUT,
                                              context=ssl._create_unverified_context()).read()
        except urllib.error.HTTPError as e:
            if e.code in [401, 403]:
                raise LiteSpeedInvalidCredentials("Litespeed login / password invalid. "
                                                  "Please, try restart lvestats service.")
            raise LiteSpeedDisabledException(str(e))
        except Exception as e:  # not good, but urllib raises lot of exceptions
            raise LiteSpeedDisabledException(str(e))

        # remove empty lines
        result = [row for row in response.split(os.linesep.encode()) if row.strip() != b'']
        return result

    def __is_host_valid(self, host):
        """
        Check whether host is not empty.
        :type host: str
        :return: bool
        """
        host = host.strip()
        if host and host not in self.IGNORE_HOSTS:
            return True
        return False

    def _get_user_domains(self, username):
        """
        Returns user domains or empty array if nothing found
        :param username: user name
        :return: user's domains list
        """
        domains = Proctitle._get_domains(username) or []
        return list(map(lambda domain: domain.encode(), domains))

    def _parse_request_info(self, request: bytes):
        """
        :return: method, url, http_version
        """
        request_info = request.strip(b'"').split()
        if len(request_info) == 3:
            method, url, http_version = request_info
        elif len(request_info) == 2:
            method, url = request_info
            http_version = b''
        else:
            return None
        return method, url, http_version

    def get_user_data(self, username):
        """
        Returns information about processed by user pages.
        :param username:
        :return list[list]:
        list of the lists
        [[Pid, Domain, Http type, Path, Http version, Time],...]
        :raises: LiteSpeedDownException
        """
        data_delimiter = b'\t'

        user_domains = self._get_user_domains(username)
        pid = self._get_litespeed_pid()

        requests = self._get_requests()
        litespeed_requests = []
        for request in requests:
            request_info = request.split(data_delimiter)
            if len(request_info) < LiteSpeedDataMapping.TOTAL_LEN:
                # that is not valid request info, skip it...
                continue

            host = request_info[LiteSpeedDataMapping.HOST]
            request = request_info[LiteSpeedDataMapping.REQUEST]

            # time since first request, seconds
            request_time = self.to_float(request_info[LiteSpeedDataMapping.TIME])

            if self.__is_host_valid(host) and host in user_domains:
                request_data = self._parse_request_info(request)
                if request_data is not None:
                    # pylint: disable=unpacking-non-sequence
                    method, url, http_version = request_data
                    litespeed_requests.append((pid, host, method, url, http_version, request_time))

        return litespeed_requests

    @staticmethod
    def to_float(string):
        """
        Converts str to float, if can't return -1.
        :type string: str
        :rtype: float
        """
        try:
            return float(string)
        except ValueError:
            return -1.